package com.jingdianjichi.subject.infra.basic.es;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;

/**
 * 自定义的ES工具类
 */
@Component
@Slf4j
public class EsRestClient {

    // 用于存储不同Elasticsearch集群客户端的映射
    public static Map<String, RestHighLevelClient> clientMap = new HashMap<>();

    @Resource
    private EsConfigProperties esConfigProperties; // 自动注入Elasticsearch配置属性

    private static final RequestOptions COMMON_OPTIONS; // 通用的请求选项

    static {
        // 初始化通用请求选项
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        COMMON_OPTIONS = builder.build();
    }

    // 在Bean的初始化之后执行此方法，用于初始化连接Elasticsearch集群的客户端。
    @PostConstruct
    public void initialize() {
        // 从配置文件中读取Elasticsearch集群的配置信息。
        List<EsClusterConfig> esConfigs = esConfigProperties.getEsConfigs();
        for (EsClusterConfig esConfig : esConfigs) {
            // 记录日志信息，显示正在初始化的Elasticsearch集群名称和节点信息。
            log.info("initialize.config.name:{},node:{}", esConfig.getName(), esConfig.getNodes());
            // 调用下面的方法初始化RestHighLevelClient客户端。
            RestHighLevelClient restHighLevelClient = initRestClient(esConfig);
            if (restHighLevelClient != null) {
                // 如果客户端初始化成功，则将其存入一个静态Map中，键为集群名称，值为对应的客户端实例。
                clientMap.put(esConfig.getName(), restHighLevelClient);
            } else {
                // 如果客户端初始化失败，则记录错误日志。
                log.error("config.name:{},node:{}.initError", esConfig.getName(), esConfig.getNodes());
            }
        }
    }

    // 根据Elasticsearch集群配置初始化RestHighLevelClient客户端的方法。
    private RestHighLevelClient initRestClient(EsClusterConfig esClusterConfig) {
        // 将配置中的节点信息（ip:port格式的字符串）分割成数组。
        String[] ipPortArr = esClusterConfig.getNodes().split(",");
        List<HttpHost> httpHostList = new ArrayList<>(ipPortArr.length);
        for (String ipPort : ipPortArr) {
            // 再将每个节点信息分割成IP和端口。
            String[] ipPortInfo = ipPort.split(":");
            if (ipPortInfo.length == 2) {
                // 根据IP和端口创建HttpHost对象，并加入到列表中。
                HttpHost httpHost = new HttpHost(ipPortInfo[0], NumberUtils.toInt(ipPortInfo[1]));
                httpHostList.add(httpHost);
            }
        }
        // 将List转换成HttpHost数组。
        HttpHost[] httpHosts = new HttpHost[httpHostList.size()];
        httpHostList.toArray(httpHosts);

        // 使用HttpHost数组创建RestClientBuilder。
        RestClientBuilder builder = RestClient.builder(httpHosts);
        // 使用RestClientBuilder创建RestHighLevelClient客户端实例。
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
        return restHighLevelClient;
    }


    // 根据集群名称获取Elasticsearch客户端实例
    private static RestHighLevelClient getClient(String clusterName) {
        // 从clientMap中根据集群名称获取对应的RestHighLevelClient实例
        return clientMap.get(clusterName);
    }

    // 向Elasticsearch插入文档的方法
    public static boolean insertDoc(EsIndexInfo esIndexInfo, EsSourceData esSourceData) {
        try {
            // 创建一个索引请求，指定索引名
            IndexRequest indexRequest = new IndexRequest(esIndexInfo.getIndexName());
            // 设置要插入的文档内容
            indexRequest.source(esSourceData.getData());
            // 设置文档ID
            indexRequest.id(esSourceData.getDocId());
            // 获取对应集群的客户端并执行索引请求，使用通用的请求选项
            getClient(esIndexInfo.getClusterName()).index(indexRequest, COMMON_OPTIONS);
            // 如果执行无异常，返回true表示插入成功
            return true;
        } catch (Exception e) {
            // 记录异常信息
            log.error("insertDoc.exception:{}", e.getMessage(), e);
        }
        // 出现异常返回false表示插入失败
        return false;
    }

    // 更新Elasticsearch中的文档
    public static boolean updateDoc(EsIndexInfo esIndexInfo, EsSourceData esSourceData) {
        try {
            // 创建一个更新请求，指定索引名和文档ID
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index(esIndexInfo.getIndexName());
            updateRequest.id(esSourceData.getDocId());
            // 设置要更新的文档内容
            updateRequest.doc(esSourceData.getData());
            // 获取对应集群的客户端并执行更新请求，使用通用的请求选项
            getClient(esIndexInfo.getClusterName()).update(updateRequest, COMMON_OPTIONS);
            // 如果执行无异常，返回true表示更新成功
            return true;
        } catch (Exception e) {
            // 记录异常信息
            log.error("updateDoc.exception:{}", e.getMessage(), e);
        }
        // 出现异常返回false表示更新失败
        return false;
    }

    // 批量更新文档的方法
    public static boolean batchUpdateDoc(EsIndexInfo esIndexInfo, List<EsSourceData> esSourceDataList) {
        try {
            boolean flag = false; // 用于标记是否有文档被添加到批处理请求中
            BulkRequest bulkRequest = new BulkRequest(); // 创建批量请求对象
            // 遍历要更新的文档数据列表
            for (EsSourceData esSourceData : esSourceDataList) {
                String docId = esSourceData.getDocId(); // 获取每个文档的ID
                // 如果文档ID不为空
                if (StringUtils.isNotBlank(docId)) {
                    UpdateRequest updateRequest = new UpdateRequest(); // 创建更新请求
                    updateRequest.index(esIndexInfo.getIndexName()); // 设置索引名
                    updateRequest.id(docId); // 设置文档ID
                    updateRequest.doc(esSourceData.getData()); // 设置要更新的内容
                    bulkRequest.add(updateRequest); // 将更新请求添加到批量请求中
                    flag = true; // 标记已有文档被添加到批处理请求
                }
            }

            // 如果有文档被添加到批处理请求
            if (flag) {
                BulkResponse bulk = getClient(esIndexInfo.getClusterName()).bulk(bulkRequest, COMMON_OPTIONS); // 执行批量请求
                if (bulk.hasFailures()) { // 如果批量操作中有失败
                    return false; // 返回false表示批量更新失败
                }
            }

            return true; // 返回true表示批量更新成功
        } catch (Exception e) {
            log.error("batchUpdateDoc.exception:{}", e.getMessage(), e); // 记录异常信息
        }
        return false; // 出现异常返回false表示批量更新失败
    }

    // 删除索引中的所有文档
    public static boolean delete(EsIndexInfo esIndexInfo) {
        try {
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(esIndexInfo.getIndexName()); // 创建删除请求，指定索引名
            deleteByQueryRequest.setQuery(QueryBuilders.matchAllQuery()); // 设置查询条件为匹配所有文档
            BulkByScrollResponse response = getClient(esIndexInfo.getClusterName())
                    .deleteByQuery(deleteByQueryRequest, COMMON_OPTIONS); // 执行删除操作
            long deleted = response.getDeleted(); // 获取被删除的文档数量
            log.info("deleted.size:{}", deleted); // 记录删除的文档数量
            return true; // 返回true表示删除成功
        } catch (Exception e) {
            log.error("delete.exception:{}", e.getMessage(), e); // 记录异常信息
        }
        return false; // 出现异常返回false表示删除失败
    }


    // 删除指定ID的文档方法
    public static boolean deleteDoc(EsIndexInfo esIndexInfo, String docId) {
        try {
            DeleteRequest deleteRequest = new DeleteRequest(esIndexInfo.getIndexName()); // 创建删除请求，并指定索引名称
            deleteRequest.id(docId); // 设置要删除的文档ID
            DeleteResponse response = getClient(esIndexInfo.getClusterName()).delete(deleteRequest, COMMON_OPTIONS); // 执行删除操作，并获取响应
            log.info("deleteDoc.response:{}", JSON.toJSONString(response)); // 记录删除操作的响应信息
            return true; // 如果操作成功完成，返回true
        } catch (Exception e) {
            log.error("deleteDoc.exception:{}", e.getMessage(), e); // 如果操作过程中出现异常，记录异常信息
        }
        return false; // 如果操作失败或出现异常，返回false
    }

    // 检查指定ID的文档是否存在方法
    public static boolean isExistDocById(EsIndexInfo esIndexInfo, String docId) {
        try {
            GetRequest getRequest = new GetRequest(esIndexInfo.getIndexName()); // 创建一个获取请求，并指定索引名称
            getRequest.id(docId); // 设置要检查的文档ID
            return getClient(esIndexInfo.getClusterName()).exists(getRequest, COMMON_OPTIONS); // 执行检查操作，如果文档存在返回true，否则返回false
        } catch (Exception e) {
            log.error("isExistDocById.exception:{}", e.getMessage(), e); // 如果操作过程中出现异常，记录异常信息
        }
        return false; // 如果操作失败或出现异常，返回false
    }


    // 通过ID获取文档内容
    public static Map<String, Object> getDocById(EsIndexInfo esIndexInfo, String docId) {
        try {
            GetRequest getRequest = new GetRequest(esIndexInfo.getIndexName()); // 创建获取请求，指定索引名称
            getRequest.id(docId); // 设置要获取的文档ID
            GetResponse response = getClient(esIndexInfo.getClusterName()).get(getRequest, COMMON_OPTIONS); // 执行获取操作，并获取响应
            Map<String, Object> source = response.getSource(); // 从响应中提取文档内容
            return source; // 返回文档内容
        } catch (Exception e) {
            log.error("getDocById.exception:{}", e.getMessage(), e); // 如果操作过程中出现异常，记录异常信息
        }
        return null; // 如果操作失败或出现异常，返回null
    }

    // 通过ID获取文档的指定字段
    public static Map<String, Object> getDocById(EsIndexInfo esIndexInfo, String docId, String[] fields) {
        try {
            GetRequest getRequest = new GetRequest(esIndexInfo.getIndexName()); // 创建获取请求，指定索引名称
            getRequest.id(docId); // 设置要获取的文档ID
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, fields, null); // 创建字段过滤上下文，指定要获取的字段
            getRequest.fetchSourceContext(fetchSourceContext); // 设置请求的字段过滤上下文
            GetResponse response = getClient(esIndexInfo.getClusterName()).get(getRequest, COMMON_OPTIONS); // 执行获取操作，并获取响应
            Map<String, Object> source = response.getSource(); // 从响应中提取文档内容
            return source; // 返回文档内容
        } catch (Exception e) {
            log.error("getDocById.exception:{}", e.getMessage(), e); // 如果操作过程中出现异常，记录异常信息
        }
        return null; // 如果操作失败或出现异常，返回null
    }


    // 使用Term查询进行搜索
    public static SearchResponse searchWithTermQuery(EsIndexInfo esIndexInfo, EsSearchRequest esSearchRequest) {
        try {
            BoolQueryBuilder bq = esSearchRequest.getBq(); // 获取布尔查询构建器
            String[] fields = esSearchRequest.getFields(); // 获取需要检索的字段
            int from = esSearchRequest.getFrom(); // 获取查询的起始位置（分页用）
            int size = esSearchRequest.getSize(); // 获取查询的大小（即每页显示的记录数）
            Long minutes = esSearchRequest.getMinutes(); // 获取滚动搜索的持续时间
            Boolean needScroll = esSearchRequest.getNeedScroll(); // 判断是否需要使用滚动搜索
            String sortName = esSearchRequest.getSortName(); // 获取排序字段名
            SortOrder sortOrder = esSearchRequest.getSortOrder(); // 获取排序方式（升序或降序）

            // 创建搜索源构建器，并设置查询条件、检索字段、分页信息等
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(bq); // 设置查询条件
            searchSourceBuilder.fetchSource(fields, null).from(from).size(size); // 设置检索的字段和分页信息

            // 如果有高亮设置，添加高亮配置
            if (Objects.nonNull(esSearchRequest.getHighlightBuilder())) {
                searchSourceBuilder.highlighter(esSearchRequest.getHighlightBuilder());
            }

            // 如果有排序字段名，添加排序配置
            if (StringUtils.isNotBlank(sortName)) {
                searchSourceBuilder.sort(sortName);
            }

            // 默认按评分降序排序
            searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));

            // 创建搜索请求，设置搜索类型、索引名和搜索源
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.searchType(SearchType.DEFAULT); // 设置搜索类型
            searchRequest.indices(esIndexInfo.getIndexName()); // 设置索引名
            searchRequest.source(searchSourceBuilder); // 设置搜索源

            // 如果需要滚动搜索，设置滚动时间
            if (needScroll) {
                Scroll scroll = new Scroll(TimeValue.timeValueMinutes(minutes));
                searchRequest.scroll(scroll);
            }

            // 执行搜索并返回搜索响应
            SearchResponse search = getClient(esIndexInfo.getClusterName()).search(searchRequest, COMMON_OPTIONS);
            return search;
        } catch (Exception e) {
            log.error("searchWithTermQuery.exception:{}", e.getMessage(), e); // 捕获异常并记录
        }
        return null; // 如果出现异常，返回null
    }


    // 批量插入文档
    public static boolean batchInsertDoc(EsIndexInfo esIndexInfo, List<EsSourceData> esSourceDataList) {
        // 日志记录批量新增的文档数量和索引名
        if (log.isInfoEnabled()) {
            log.info("批量新增ES文档数量:" + esSourceDataList.size());
            log.info("索引名:" + esIndexInfo.getIndexName());
        }
        try {
            boolean flag = false; // 标记是否有文档被添加到批处理请求中
            BulkRequest bulkRequest = new BulkRequest(); // 创建批量请求

            // 遍历文档数据列表，将每个文档添加到批量请求中
            for (EsSourceData source : esSourceDataList) {
                String docId = source.getDocId(); // 获取文档ID
                if (StringUtils.isNotBlank(docId)) { // 检查文档ID是否非空
                    IndexRequest indexRequest = new IndexRequest(esIndexInfo.getIndexName()); // 创建索引请求
                    indexRequest.id(docId); // 设置文档ID
                    indexRequest.source(source.getData()); // 设置文档数据
                    bulkRequest.add(indexRequest); // 将索引请求添加到批量请求中
                    flag = true; // 标记已有文档添加到批处理请求
                }
            }

            // 如果批量请求中有文档，执行批量操作
            if (flag) {
                BulkResponse response = getClient(esIndexInfo.getClusterName()).bulk(bulkRequest, COMMON_OPTIONS); // 执行批量请求
                if (response.hasFailures()) { // 检查批量操作是否有失败
                    return false; // 如果有失败，返回false
                }
            }
        } catch (Exception e) {
            log.error("batchInsertDoc.error", e); // 捕获异常并记录
            return false; // 发生异常，返回false
        }

        return true; // 批量插入操作成功
    }

    // 通过查询条件更新文档
    public static boolean updateByQuery(EsIndexInfo esIndexInfo, QueryBuilder queryBuilder, Script script, int batchSize) {
        // 日志记录更新操作的索引名
        if (log.isInfoEnabled()) {
            log.info("通过查询条件更新文档，索引名:" + esIndexInfo.getIndexName());
        }
        try {
            UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(esIndexInfo.getIndexName()); // 创建通过查询更新请求
            updateByQueryRequest.setQuery(queryBuilder); // 设置查询条件
            updateByQueryRequest.setScript(script); // 设置更新时使用的脚本
            updateByQueryRequest.setBatchSize(batchSize); // 设置每批处理的文档数目
            updateByQueryRequest.setAbortOnVersionConflict(false); // 设置遇到版本冲突时是否中止操作

            // 执行通过查询更新操作
            BulkByScrollResponse response = getClient(esIndexInfo.getClusterName()).updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
            // 检查操作中是否有失败
            List<BulkItemResponse.Failure> failures = response.getBulkFailures();
            if (!failures.isEmpty()) { // 如果有失败，记录失败信息并返回false
                failures.forEach(failure -> log.error("更新操作中的失败：" + failure.getMessage()));
                return false;
            }
        } catch (Exception e) {
            log.error("updateByQuery.error", e); // 捕获异常并记录
            return false; // 发生异常，返回false
        }
        return true; // 更新操作成功
    }


    /**
     * 使用Elasticsearch的分词器对文本进行分词
     *
     * @param esIndexInfo Elasticsearch索引信息，包含索引名和集群名
     * @param text 要分词的文本
     * @return 分词结果列表，每个元素是一个分词后的词
     * @throws Exception 如果执行分词请求失败或处理响应时出现错误
     */
    public static List<String> getAnalyze(EsIndexInfo esIndexInfo, String text) throws Exception {
        List<String> list = new ArrayList<String>(); // 存储分词结果的列表
        Request request = new Request("GET", "_analyze"); // 创建分词请求，指定方法为GET，路径为_analyze
        JSONObject entity = new JSONObject(); // 创建JSON对象用于设置请求体
        entity.put("analyzer", "ik_smart"); // 设置使用的分词器，这里使用ik_smart分词器
        entity.put("text", text); // 设置要分词的文本
        request.setJsonEntity(entity.toJSONString()); // 将JSON对象转换为字符串并设置为请求体

        // 执行分词请求，获取响应
        Response response = getClient(esIndexInfo.getClusterName()).getLowLevelClient().performRequest(request);
        // 解析响应体，获取分词结果
        JSONObject tokens = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
        JSONArray arrays = tokens.getJSONArray("tokens"); // 从响应中获取分词结果数组
        // 遍历分词结果数组，提取分词并添加到结果列表中
        for (int i = 0; i < arrays.size(); i++) {
            JSONObject obj = JSON.parseObject(arrays.getString(i)); // 获取每个分词结果的JSON对象
            list.add(obj.getString("token")); // 提取分词结果中的token字段（实际的分词）并添加到列表中
        }
        return list; // 返回分词结果列表
    }



}
